Source code for hysop.numerics.fft.numpy_fft

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
FFT iterface for fast Fourier Transforms using C fftpack fork (using numpy).
:class:`~hysop.numerics.NumpyFFT`
:class:`~hysop.numerics.NumpyFFTPlan`
"""

import numpy as np
from numpy import fft as _FFT

from hysop.tools.htypes import first_not_None
from hysop.numerics.fft.host_fft import HostFFTPlanI, HostFFTI, HostArray
from hysop.numerics.fft.fft import (
    complex_to_float_dtype,
    float_to_complex_dtype,
    mk_view,
    mk_shape,
)


[docs] def dct(a, out=None, type=2, axis=-1): ndim = a.ndim shape = a.shape N = a.shape[axis] if type == 1: # O(sqrt(log(N))) error, O(2N) complexity, O(4*N) memory slc0 = mk_view(ndim, axis, 1, -1) slc1 = mk_view(ndim, axis, None, None, -1) s0 = mk_shape(shape, axis, 2 * N - 2) X = np.empty(shape=s0, dtype=a.dtype) np.concatenate((a, a[slc0][slc1]), axis=axis, out=X) res = _FFT.rfft(X, axis=axis).real if out is None: out = res else: assert out.shape == res.shape out[...] = res elif type == 2: # O(sqrt(log(N))) error, O(N) complexity, O(3N) memory n0 = N // 2 + 1 n1 = (N - 1) // 2 + 1 slc0 = mk_view(ndim, axis, 1, None, None) slc1 = mk_view(ndim, axis, None, None, +2) slc2 = mk_view(ndim, axis, 1, None, +2) slc3 = mk_view(ndim, axis, None, None, -1) slc4 = mk_view(ndim, axis, None, None, None, default=None) slc5 = mk_view(ndim, axis, None, n1, None) slc6 = mk_view(ndim, axis, n1, None, None) X = np.empty_like(a) np.concatenate((a[slc1], a[slc2][slc3]), axis=axis, out=X) X = _FFT.rfft(X, axis=axis) X *= (2 * np.exp(-1j * np.pi * np.arange(n0) / (2 * N)))[slc4] if out is None: out = np.empty_like(a) else: assert out.shape == a.shape out[slc5] = +X.real[slc5] out[slc6] = -X.imag[slc0][slc3] elif type == 3: # O(sqrt(log(N))) error, O(N) complexity, O(3N) memory ctype = float_to_complex_dtype(a.dtype) n0 = N // 2 + 1 n1 = (N - 1) // 2 + 1 slc0 = mk_view(ndim, axis, None, n0, None) slc1 = mk_view(ndim, axis, 1, None, None) slc2 = mk_view(ndim, axis, n1, None, None) slc3 = mk_view(ndim, axis, None, None, -1) slc4 = mk_view(ndim, axis, None, None, +2) slc5 = mk_view(ndim, axis, None, n1, None) slc6 = mk_view(ndim, axis, 1, None, +2) slc7 = mk_view(ndim, axis, None, None, None, default=None) s0 = mk_shape(shape, axis, n0) X = np.zeros(shape=s0, dtype=ctype) X.real[slc0] = +a[slc0] X.imag[slc1] = -a[slc2][slc3] X *= np.exp(+1j * np.pi * np.arange(n0) / (2 * N))[slc7] X = _FFT.irfft(X, axis=axis, n=N) X *= N if out is None: out = np.empty_like(a) else: assert out.shape == a.shape out[slc4] = X[slc5] out[slc6] = X[slc2][slc3] else: stypes = ["I", "II", "III", "IV", "V", "VI", "VII", "VIII"] msg = "DCT-{} has not been implemented yet." msg = msg.format(stypes[type - 1]) raise NotImplementedError(msg) return out
[docs] def dst(a, out=None, type=2, axis=-1): ndim = a.ndim shape = a.shape N = a.shape[axis] if type == 1: # O(sqrt(log(N))) error, O(2N) complexity, O(4*N) memory slc0 = mk_view(ndim, axis, None, None, -1) slc1 = mk_view(ndim, axis, 1, -1, None) s0 = mk_shape(shape, axis, 2 * N + 2) s1 = mk_shape(shape, axis, 1) X = np.empty(shape=s0, dtype=a.dtype) Z = np.zeros(shape=s1, dtype=a.dtype) np.concatenate((Z, -a, Z, a[slc0]), axis=axis, out=X) res = _FFT.rfft(X, axis=axis).imag if out is None: out = np.empty_like(a) else: assert out.shape == a.shape out[...] = res[slc1] elif type == 2: # O(sqrt(log(N))) error, O(N) complexity, O(3N) memory n0 = N // 2 + 1 n1 = (N - 1) // 2 + 1 slc0 = mk_view(ndim, axis, 1, None, None) slc1 = mk_view(ndim, axis, None, None, +2) slc2 = mk_view(ndim, axis, 1, None, +2) slc3 = mk_view(ndim, axis, None, None, -1) slc4 = mk_view(ndim, axis, None, None, None, default=None) slc5 = mk_view(ndim, axis, None, n1 - 1, None) slc6 = mk_view(ndim, axis, n1 - 1, None, None) slc7 = mk_view(ndim, axis, 1, n1, None) X = np.empty_like(a) np.concatenate((a[slc1], -a[slc2][slc3]), axis=axis, out=X) X = _FFT.rfft(X, axis=axis) X *= (2 * np.exp(-1j * np.pi * np.arange(n0) / (2 * N)))[slc4] if out is None: out = np.empty_like(a) else: assert out.shape == a.shape out[slc5] = -X.imag[slc7] out[slc6] = +X.real[slc3] elif type == 3: # O(sqrt(log(N))) error, O(N) complexity, O(3N) memory ctype = float_to_complex_dtype(a.dtype) n0 = N // 2 + 1 n1 = (N - 1) // 2 + 1 slc0 = mk_view(ndim, axis, None, n0, None) slc1 = mk_view(ndim, axis, None, None, -1) slc2 = mk_view(ndim, axis, 1, None, None) slc3 = mk_view(ndim, axis, None, N - n1, None) slc4 = mk_view(ndim, axis, None, None, None, default=None) slc5 = mk_view(ndim, axis, None, None, 2) slc6 = mk_view(ndim, axis, None, n1, None) slc7 = mk_view(ndim, axis, 1, None, 2) slc8 = mk_view(ndim, axis, n1, None, None) s0 = mk_shape(shape, axis, n0) X = np.zeros(shape=s0, dtype=ctype) X.real[slc0] = +a[slc1][slc0] X.imag[slc2] = -a[slc3] X *= np.exp(+1j * np.pi * np.arange(n0) / (2 * N))[slc4] X = _FFT.irfft(X, axis=axis, n=N) X[...] *= N if out is None: out = np.empty_like(a) else: assert out.shape == a.shape out[slc5] = +X[slc6] out[slc7] = -X[slc8][slc1] else: stypes = ["I", "II", "III", "IV", "V", "VI", "VII", "VIII"] msg = "DCT-{} has not been implemented yet." msg = msg.format(stypes[type - 1]) raise NotImplementedError(msg) return out
[docs] def idct(a, out=None, type=2, axis=-1, **kwds): itype = [1, 3, 2, 4][type - 1] return dct(a=a, out=out, type=itype, axis=axis, **kwds)
[docs] def idst(a, out=None, type=2, axis=-1, **kwds): itype = [1, 3, 2, 4][type - 1] return dst(a=a, out=out, type=itype, axis=axis, **kwds)
[docs] class NumpyFFTPlan(HostFFTPlanI): """ Wrap a numpy fft call (numpy.fft does not offer real planning capabilities). """ def __init__(self, fn, a, out, scaling=None, **kwds): super().__init__() self.fn = fn self.a = a self.out = out self.scaling = scaling if isinstance(a, HostArray): a = a.handle if isinstance(out, HostArray): out = out.handle kwds = kwds.copy() kwds["a"] = a if fn in (dct, idct, dst, idst): kwds["out"] = out self.kwds = kwds @property def input_array(self): return self.a @property def output_array(self): return self.out
[docs] def execute(self): out = self.out scaling = self.scaling if self.fn in (dct, idct, dst, idst): self.fn(**self.kwds) else: out[...] = self.fn(**self.kwds) if scaling is not None: out[...] *= scaling
[docs] class NumpyFFT(HostFFTI): """ Interface to compute local to process FFT-like transforms using the numpy fft backend. Numpy fft backend has many disadvantages: - only double precision is really supported - single precision is supported by casting to double precision - creates intermediate temporary buffers at each call - no planning capabilities (numpy.fft methods are just wrapped into fake plans) The only advantage is that planning won't destroy original inputs. """ def __init__(self, backend=None, allocator=None, warn_on_allocation=True, **kwds): super().__init__( backend=backend, allocator=allocator, warn_on_allocation=warn_on_allocation, **kwds, ) self.supported_ftypes = ( np.float32, np.float64, ) self.supported_ctypes = ( np.complex64, np.complex128, )
[docs] def fft(self, a, out=None, axis=-1, **kwds): (shape, dtype) = super().fft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan(fn=_FFT.fft, a=a, out=out, axis=axis, **kwds) return plan
[docs] def ifft(self, a, out=None, axis=-1, **kwds): (shape, dtype, s) = super().ifft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan(fn=_FFT.ifft, a=a, out=out, axis=axis, **kwds) return plan
[docs] def rfft(self, a, out=None, axis=-1, **kwds): (shape, dtype) = super().rfft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan(fn=_FFT.rfft, a=a, out=out, axis=axis, **kwds) return plan
[docs] def irfft(self, a, out=None, n=None, axis=-1, **kwds): (shape, dtype, s) = super().irfft(a=a, out=out, n=n, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan( fn=_FFT.irfft, a=a, out=out, axis=axis, n=shape[axis], **kwds ) return plan
[docs] def dct(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype) = super().dct(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan(fn=dct, a=a, out=out, axis=axis, type=type, **kwds) return plan
[docs] def idct(self, a, out=None, type=2, axis=-1, scaling=None, **kwds): (shape, dtype, _, s) = super().idct(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan( fn=idct, a=a, out=out, axis=axis, type=type, scaling=first_not_None(scaling, 1.0 / s), **kwds, ) return plan
[docs] def dst(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype) = super().dst(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan(fn=dst, a=a, out=out, axis=axis, type=type, **kwds) return plan
[docs] def idst(self, a, out=None, type=2, axis=-1, scaling=None, **kwds): (shape, dtype, _, s) = super().idst(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) plan = NumpyFFTPlan( fn=idst, a=a, out=out, axis=axis, type=type, scaling=first_not_None(scaling, 1.0 / s), **kwds, ) return plan